Skip to main content

Overview

The RateLimiter class implements a Sliding Window Log algorithm to enforce configurable request limits per time window. This is critical for Steam Web API compliance and preventing account bans.
API Safety: Steam’s Community Market API enforces strict rate limits.Exceeding these limits can result in:
  • 429 Too Many Requests errors
  • Temporary IP bans
  • Permanent account restrictions
The rate limiter guarantees compliance when shared across all schedulers.

Class Definition

RateLimiter.py:5-22
class RateLimiter:
    """
    Rate limiter using Sliding Window Log algorithm.
    Enforces configurable request limits per time window for Steam Web API safety.
    """

    def __init__(self, max_requests: int = 15, window_seconds: float = 60.0):
        """
        Initialize the rate limiter.

        Args:
            max_requests: Maximum requests allowed per window
            window_seconds: Time window in seconds
        """
        self._lock = asyncio.Lock()
        self._timestamps: list[float] = []  # Sliding window log
        self._max_requests = max_requests
        self._window_seconds = window_seconds
State components:
FieldTypePurpose
_lockasyncio.LockEnsures thread-safe access to timestamps
_timestampslist[float]Log of request times (Unix timestamps)
_max_requestsintMaximum allowed requests in window
_window_secondsfloatTime window duration (e.g., 60.0 = 1 minute)

Sliding Window Algorithm

The rate limiter uses a precise sliding window approach, not a fixed window:
Time: ────|────|────|────|────> now
Window:        [←── 60s ──]
          
Only requests in the trailing 60s window count.
Window slides continuously with time.
Advantages:
  • No burst at window boundaries
  • Precise rate limiting
  • Fair distribution

Core Method: acquire_token()

Every API call must acquire a token before proceeding:
RateLimiter.py:24-49
async def acquire_token(self) -> None:
    """
    Acquire a token to make a request, waiting if necessary to respect rate limits.

    This method ensures that no more than max_requests occur within any window.
    If the limit is reached, it waits until a slot becomes available.
    """
    while True:
        async with self._lock:
            current_time = time.time()
            cutoff_time = current_time - self._window_seconds
            
            # Remove expired timestamps (older than window)
            self._timestamps = [ts for ts in self._timestamps if ts > cutoff_time]

            # Check if we've hit the rate limit
            if len(self._timestamps) >= self._max_requests:
                # Calculate exact wait time until oldest timestamp exits the window
                oldest_timestamp = self._timestamps[0]
                wait_time = self._window_seconds - (current_time - oldest_timestamp)
            else:
                # We have capacity - grant the token
                self._timestamps.append(time.time())
                return

        # Lock is automatically released here when exiting the context manager
        # Wait outside the critical section
        await asyncio.sleep(wait_time)

Algorithm Breakdown

1

Acquire Lock

async with self._lock:
Ensures only one coroutine modifies _timestamps at a time. Critical for correctness.
2

Clean Expired Timestamps

current_time = time.time()
cutoff_time = current_time - self._window_seconds
self._timestamps = [ts for ts in self._timestamps if ts > cutoff_time]
Example:
  • Current time: 1709476800.0 (2024-03-03 14:00:00)
  • Window: 60.0 seconds
  • Cutoff: 1709476740.0 (2024-03-03 13:59:00)
  • Keep only timestamps > cutoff (last 60 seconds)
3

Check Capacity

if len(self._timestamps) >= self._max_requests:
    # RATE LIMITED - calculate wait time
    oldest_timestamp = self._timestamps[0]
    wait_time = self._window_seconds - (current_time - oldest_timestamp)
Wait time calculation:
  • Oldest request was at t = 0
  • Current time is t = 45
  • Window is 60 seconds
  • Wait time = 60 - (45 - 0) = 15 seconds
After 15 seconds, the oldest request will age out of the window.
4

Grant Token or Sleep

if capacity_available:
    self._timestamps.append(time.time())  # Record this request
    return  # Grant token immediately
else:
    await asyncio.sleep(wait_time)  # Wait outside the lock
Key insight: Sleep happens outside the lock to avoid blocking other coroutines.

Visual Example

Consider a rate limiter configured for 3 requests per 10 seconds:
rate_limiter = RateLimiter(max_requests=3, window_seconds=10.0)
Timeline:
await rate_limiter.acquire_token()  # Instant grant
# _timestamps = [0.0]
# Capacity: 1/3 used

Usage in API Client

Every Steam API method calls acquire_token() before making a request:
steamAPIclient.py:51-88 (example: fetch_price_overview)
async def fetch_price_overview(
    self,
    appid: int,
    market_hash_name: str,
    currency: int = 1,
    country: str = "US",
    language: str = "english"
) -> PriceOverviewData:
    # CRITICAL: Acquire rate limit token before making request
    await self.rate_limiter.acquire_token()

    url = f"{self.BASE_URL}priceoverview/"
    params = {
        "appid": appid,
        "market_hash_name": market_hash_name,
        "currency": currency,
        "country": country,
        "language": language
    }

    async with self.session.get(url, params=params) as response:
        response.raise_for_status()
        raw_response = await response.json()
        return PriceOverviewData(**raw_response)
Flow:
  1. Scheduler decides to execute an item
  2. Calls steam_client.fetch_price_overview(...)
  3. Client calls await self.rate_limiter.acquire_token()
  4. Rate limiter either:
    • Returns immediately (capacity available)
    • Sleeps until capacity available
  5. HTTP request proceeds
The caller (scheduler) is completely unaware of rate limiting delays.From the scheduler’s perspective, the API call just takes longer when rate-limited. This separation of concerns keeps scheduling logic simple.

Shared Instance Pattern

The orchestrator creates a single RateLimiter instance and passes it to all schedulers:
cerebro.py:145-153
def setup_schedulers(self):
    # Create single shared rate limiter (CRITICAL for API compliance)
    rate_limit = self.config['LIMITS']['REQUESTS']
    window_seconds = self.config['LIMITS']['WINDOW_SECONDS']
    self.rate_limiter = RateLimiter(max_requests=rate_limit, window_seconds=window_seconds)
    
    # Pass to both schedulers
    self.snoozerScheduler = snoozerScheduler(rate_limiter=self.rate_limiter)
    self.clockworkScheduler = ClockworkScheduler(rate_limiter=self.rate_limiter)
Why sharing is critical:
Orchestrator
└── RateLimiter (max=100)
    ├── snoozerScheduler → uses shared instance
    └── ClockworkScheduler → uses shared instance

Total capacity: 100 req/window ✓

Thread Safety & Async Safety

The rate limiter is designed for async/concurrent usage:
# Multiple coroutines can call acquire_token() simultaneously
await asyncio.gather(
    rate_limiter.acquire_token(),  # Coroutine 1
    rate_limiter.acquire_token(),  # Coroutine 2
    rate_limiter.acquire_token(),  # Coroutine 3
)
Safety mechanisms:
async with self._lock:
    # Only ONE coroutine can execute this block at a time
    self._timestamps = [ts for ts in self._timestamps if ts > cutoff_time]
Without the lock, two coroutines could:
  1. Both read len(self._timestamps) == 14 (below limit of 15)
  2. Both append a timestamp
  3. Result: 16 timestamps (exceeds limit!)
async with self._lock:
    # Calculate wait_time
    wait_time = ...
# Lock released here

# Sleep OUTSIDE the lock
await asyncio.sleep(wait_time)
If we slept inside the lock, all other coroutines would block during the sleep. This would serialize requests instead of allowing concurrent processing.
self._timestamps = [ts for ts in self._timestamps if ts > cutoff_time]
This creates a new list and assigns it atomically. No risk of partial updates.

Configuration Examples

LIMITS:
  REQUESTS: 100
  WINDOW_SECONDS: 300  # 5 minutes
  # 100 req / 5 min = 20 req/min = 0.33 req/sec
Steam’s actual limits are undocumented and vary by:
  • Account level
  • Authentication method (cookies vs API key)
  • IP reputation
  • Time of day
Start conservative (100 req/5min) and increase gradually while monitoring for 429 errors.

Performance Characteristics

  • Acquire token (capacity available): O(n) where n = requests in window
  • Acquire token (rate limited): O(n) + sleep time
  • Cleanup: O(n) list comprehension
In practice, n is small (typically 15-200), so performance is excellent.
O(max_requests) - stores at most max_requests timestamps.Example: 200 req limit = 200 floats × 8 bytes = 1.6 KB of memory.
Under high load (many schedulers), coroutines queue at the lock.The critical section is very short (~1ms), so contention is minimal even with 10+ concurrent schedulers.

Comparison to Alternatives

AlgorithmAccuracyMemoryBurst Handling
Sliding Window Log (used)✓ PreciseO(N)✓ Perfect
Token Bucket~ ApproximateO(1)⚠ Allows bursts
Leaky Bucket✓ PreciseO(1)✓ Smooth
Fixed Window✗ Burst vulnerableO(1)✗ 2x burst possible
Why Sliding Window Log?
Precise rate limiting (no burst vulnerability)
Simple implementation (easier to debug)
Minimal memory overhead (max 200 timestamps)
No drift or accumulation errors

Testing the Rate Limiter

You can test the rate limiter in isolation:
test_rate_limiter.py
import asyncio
import time
from src.RateLimiter import RateLimiter

async def test_rate_limiting():
    limiter = RateLimiter(max_requests=3, window_seconds=5.0)
    
    print("Requesting 5 tokens (limit is 3 per 5 seconds)...")
    
    for i in range(5):
        start = time.time()
        await limiter.acquire_token()
        elapsed = time.time() - start
        print(f"Token {i+1} acquired after {elapsed:.2f}s")

if __name__ == "__main__":
    asyncio.run(test_rate_limiting())
Expected output:
Requesting 5 tokens (limit is 3 per 5 seconds)...
Token 1 acquired after 0.00s  # Instant
Token 2 acquired after 0.00s  # Instant
Token 3 acquired after 0.00s  # Instant
Token 4 acquired after 5.00s  # Waited for token 1 to expire
Token 5 acquired after 5.00s  # Waited for token 2 to expire

Orchestrator

Learn how the orchestrator creates and shares the rate limiter

Schedulers

See how schedulers use the rate limiter in practice

Configuration

Configure rate limits in config.yaml